{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Batch Prediction with Ray Core\n",
    "\n",
    "<a id=\"try-anyscale-quickstart-batch_prediction\" href=\"https://console.anyscale.com/register/ha?render_flow=ray&utm_source=ray_docs&utm_medium=docs&utm_campaign=batch_prediction\">\n",
    "    <img src=\"../../_static/img/run-on-anyscale.svg\" alt=\"try-anyscale-quickstart\">\n",
    "</a>\n",
    "<br></br>\n",
    "\n",
    "```{note}\n",
    "For a higher level API for batch inference on large datasets, see [batch inference with Ray Data](batch_inference_home). This example is for users who want more control over data sharding and execution.\n",
    "```\n",
    "\n",
    "The batch prediction is the process of using a trained model to generate predictions for a collection of observations. It has the following elements:\n",
    "* Input dataset: this is a collection of observations to generate predictions for. The data is usually stored in an external storage system like S3, HDFS or database, and can be large.\n",
    "* ML model: this is a trained ML model which is usually also stored in an external storage system.\n",
    "* Predictions: these are the outputs when applying the ML model on observations. The predictions are usually written back to the storage system.\n",
    "\n",
    "With Ray, you can build scalable batch prediction for large datasets at high prediction throughput. Ray Data provides a [higher-level API for offline batch inference](batch_inference_home), with built-in optimizations. However, for more control, you can use the lower-level Ray Core APIs. This example demonstrates batch inference with Ray Core by splitting the dataset into disjoint shards and executing them in parallel, with either Ray Tasks or Ray Actors across a Ray Cluster."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Task-based batch prediction\n",
    "\n",
    "With Ray tasks, you can build a batch prediction program in this way:\n",
    "1. Loads the model\n",
    "2. Launches Ray tasks, with each taking in the model and a shard of input dataset\n",
    "3. Each worker executes predictions on the assigned shard, and writes out results\n",
    "\n",
    "Let’s take NYC taxi data in 2009 for example. Suppose we have this simple model:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "\n",
    "def load_model():\n",
    "    # A dummy model.\n",
    "    def model(batch: pd.DataFrame) -> pd.DataFrame:\n",
    "        # Dummy payload so copying the model will actually copy some data\n",
    "        # across nodes.\n",
    "        model.payload = np.zeros(100_000_000)\n",
    "        return pd.DataFrame({\"score\": batch[\"passenger_count\"] % 2 == 0})\n",
    "    \n",
    "    return model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The dataset has 12 files (one for each month) so we can naturally have each Ray task to take one file. By taking the model and a shard of input dataset (i.e. a single file), we can define a Ray remote task for prediction:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyarrow.parquet as pq\n",
    "import ray\n",
    "\n",
    "@ray.remote\n",
    "def make_prediction(model, shard_path):\n",
    "    df = pq.read_table(shard_path).to_pandas()\n",
    "    result = model(df)\n",
    "\n",
    "    # Write out the prediction result.\n",
    "    # NOTE: unless the driver will have to further process the\n",
    "    # result (other than simply writing out to storage system),\n",
    "    # writing out at remote task is recommended, as it can avoid\n",
    "    # congesting or overloading the driver.\n",
    "    # ...\n",
    "\n",
    "    # Here we just return the size about the result in this example.\n",
    "    return len(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The driver launches all tasks for the entire input dataset. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 12 files, one for each remote task.\n",
    "input_files = [\n",
    "        f\"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet\"\n",
    "        f\"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet\"\n",
    "        for i in range(12)\n",
    "]\n",
    "\n",
    "# ray.put() the model just once to local object store, and then pass the\n",
    "# reference to the remote tasks.\n",
    "model = load_model()\n",
    "model_ref = ray.put(model)\n",
    "\n",
    "result_refs = []\n",
    "\n",
    "# Launch all prediction tasks.\n",
    "for file in input_files:\n",
    "    # Launch a prediction task by passing model reference and shard file to it.\n",
    "    # NOTE: it would be highly inefficient if you are passing the model itself\n",
    "    # like make_prediction.remote(model, file), which in order to pass the model\n",
    "    # to remote node will ray.put(model) for each task, potentially overwhelming\n",
    "    # the local object store and causing out-of-disk error.\n",
    "    result_refs.append(make_prediction.remote(model_ref, file))\n",
    "\n",
    "results = ray.get(result_refs)\n",
    "\n",
    "# Let's check prediction output size.\n",
    "for r in results:\n",
    "    print(\"Prediction output size:\", r)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to not overload the cluster and cause OOM, we can control the parallelism by setting the proper resource requirement for tasks, see details about this design pattern in {doc}`/ray-core/patterns/limit-running-tasks`.\n",
    "For example, if it's easy for your to get a good estimate of the in-memory size for data loaded from external storage, you can control the parallelism by specifying the amount of memory needed for each task, e.g. launching tasks with ``make_prediction.options(memory=100*1023*1025).remote(model_ref, file)``. Ray will then do the right thing and make sure tasks scheduled to a node will not exceed its total memory."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```{tip}\n",
    "To avoid repeatedly storing the same model into object store (this can cause Out-of-disk for driver node), use ray.put() to store the model once, and then pass the reference around.\n",
    "```\n",
    "```{tip}\n",
    "To avoid congest or overload the driver node, it’s preferable to have each task to write out the predictions (instead of returning results back to driver which actualy does nothing but write out to storage system).\n",
    "```\n",
    "\n",
    "## Actor-based batch prediction\n",
    "In the above solution, each Ray task will have to fetch the model from the driver node before it can start performing the prediction. This is an overhead cost that can be significant if the model size is large. We can optimize it by using Ray actors, which will fetch the model just once and reuse it for all tasks assigned to the actor.\n",
    "\n",
    "First, we define a callable class that’s structured with an interface (i.e. constructor) to load/cache the model, and the other to take in a file and perform prediction."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import pyarrow.parquet as pq\n",
    "import ray\n",
    "\n",
    "@ray.remote\n",
    "class BatchPredictor:\n",
    "    def __init__(self, model):\n",
    "        self.model = model\n",
    "        \n",
    "    def predict(self, shard_path):\n",
    "        df = pq.read_table(shard_path).to_pandas()\n",
    "        result =self.model(df)\n",
    "\n",
    "        # Write out the prediction result.\n",
    "        # NOTE: unless the driver will have to further process the\n",
    "        # result (other than simply writing out to storage system),\n",
    "        # writing out at remote task is recommended, as it can avoid\n",
    "        # congesting or overloading the driver.\n",
    "        # ...\n",
    "\n",
    "        # Here we just return the size about the result in this example.\n",
    "        return len(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The constructor is called only once per actor worker. We use ActorPool to manage a set of actors that can receive prediction requests."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from ray.util.actor_pool import ActorPool\n",
    "\n",
    "model = load_model()\n",
    "model_ref = ray.put(model)\n",
    "num_actors = 4\n",
    "actors = [BatchPredictor.remote(model_ref) for _ in range(num_actors)]\n",
    "pool = ActorPool(actors)\n",
    "input_files = [\n",
    "        f\"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet\"\n",
    "        f\"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet\"\n",
    "        for i in range(12)\n",
    "]\n",
    "for file in input_files:\n",
    "    pool.submit(lambda a, v: a.predict.remote(v), file)\n",
    "while pool.has_next():\n",
    "    print(\"Prediction output size:\", pool.get_next())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that the ActorPool is fixed in size, unlike task-based approach where the number of parallel tasks can be dynamic (as long as it's not exceeding max_in_flight_tasks). To have autoscaling actor pool, you will need to use the [Ray Data batch prediction](batch_inference_home)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Batch prediction with GPUs\n",
    "\n",
    "If your cluster has GPU nodes and your predictor can utilize the GPUs, you can direct the tasks or actors to those GPU nodes by specifying num_gpus. Ray will schedule them onto GPU nodes accordingly. On the node, you will need to move the model to GPU. The following is an example for Torch model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import torch\n",
    "\n",
    "@ray.remote(num_gpus=1)\n",
    "def make_torch_prediction(model: torch.nn.Module, shard_path):\n",
    "    # Move model to GPU.\n",
    "    model.to(torch.device(\"cuda\"))\n",
    "    inputs = pq.read_table(shard_path).to_pandas().to_numpy()\n",
    "\n",
    "    results = []\n",
    "    # for each tensor in inputs:\n",
    "    #   results.append(model(tensor))\n",
    "    #\n",
    "    # Write out the results right in task instead of returning back\n",
    "    # to the driver node (unless you have to), to avoid congest/overload\n",
    "    # driver node.\n",
    "    # ...\n",
    "\n",
    "    # Here we just return simple/light meta information.\n",
    "    return len(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## FAQs\n",
    "\n",
    "### How to load and pass model efficiently in Ray cluster if the model is large?\n",
    "The recommended way is to (taking task-based batch prediction for example, the actor-based is the same):\n",
    "1. let the driver load the model (e.g. from storage system)\n",
    "2. let the driver ray.put(model) to store the model into object store; and\n",
    "3. pass the same reference of the model to each remote tasks when launching them.\n",
    "The remote task will fetch the model (from driver's object store) to its local object store before start performing prediction.\n",
    "\n",
    "Note it's highly inefficient if you skip the step 2 and pass the model (instead of reference) to remote tasks. If the model is large and there are many tasks, it'll likely cause out-of-disk crash for the driver node."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# GOOD: the model will be stored to driver's object store only once\n",
    "model = load_model()\n",
    "model_ref = ray.put(model)\n",
    "for file in input_files:\n",
    "    make_prediction.remote(model_ref, file)\n",
    "\n",
    "# BAD: the same model will be stored to driver's object store repeatedly for each task\n",
    "model = load_model()\n",
    "for file in input_files:\n",
    "    make_prediction.remote(model, file)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For more details, check out {doc}`/ray-core/patterns/pass-large-arg-by-value`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### How to improve the GPU utilization rate?\n",
    "To keep GPUs busy, there are following things to look at:\n",
    "- **Schedule multiple tasks on the same GPU node if it has multiple GPUs**: If there are multiple GPUs on same node and a single task cannot use them all, you can direct multiple tasks to the node. This is automatically handled by Ray, e.g. if you specify num_gpus=1 and there are 4 GPUs, Ray will schedule 4 tasks to the node, provided there are enough tasks and no other resource constraints.\n",
    "- **Use actor-based approach**: as mentioned above, actor-based approach is more efficient because it reuses model initialization for many tasks, so the node will spend more time on the actual workload."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.8.10 ('venv': venv)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  },
  "orig_nbformat": 4,
  "vscode": {
   "interpreter": {
    "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
